<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<script><!--#include virtual="../../js/templateData.js" --></script>

<script id="content-template" type="text/x-handlebars-template">
  <!-- h1>Developer Guide for Kafka Streams</h1 -->
  <div class="sub-nav-sticky">
    <div class="sticky-top">
      <!-- div style="height:35px">
        <a href="/{{version}}/documentation/streams/">Introduction</a>
        <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
        <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
      </div -->
    </div>
  </div>

    <div class="section" id="streams-security">
        <span id="streams-developer-guide-security"></span><h1>Streams Security<a class="headerlink" href="#streams-security" title="Permalink to this headline"></a></h1>
        <div class="contents local topic" id="table-of-contents">
            <p class="topic-title first"><b>Table of Contents</b></p>
            <ul class="simple">
                <li><a class="reference internal" href="#required-acl-setting-for-secure-kafka-clusters" id="id1">Required ACL setting for secure Kafka clusters</a></li>
                <li><a class="reference internal" href="#security-example" id="id2">Security example</a></li>
            </ul>
        </div>
        <p>Kafka Streams natively integrates with the <a class="reference internal" href="../../documentation.html#security"><span class="std std-ref">Kafka&#8217;s security features</span></a> and supports all of the
            client-side security features in Kafka.  Streams leverages the <a class="reference internal" href="../../clients/index.html#kafka-clients"><span class="std std-ref">Java Producer and Consumer API</span></a>.</p>
        <p>To secure your Stream processing applications, configure the security settings in the corresponding Kafka producer
            and consumer clients, and then specify the corresponding configuration settings in your Kafka Streams application.</p>
        <p>Kafka supports cluster encryption and authentication, including a mix of authenticated and unauthenticated,
            and encrypted and non-encrypted clients. Using security is optional.</p>
        <p>Here a few relevant client-side security features:</p>
        <dl class="docutils">
            <dt>Encrypt data-in-transit between your applications and Kafka brokers</dt>
            <dd>You can enable the encryption of the client-server communication between your applications and the Kafka brokers.
                For example, you can configure your applications to always use encryption when reading and writing data to and from
                Kafka. This is critical when reading and writing data across security domains such as internal network, public
                internet, and partner networks.</dd>
            <dt>Client authentication</dt>
            <dd>You can enable client authentication for connections from your application to Kafka brokers. For example, you can
                define that only specific applications are allowed to connect to your Kafka cluster.</dd>
            <dt>Client authorization</dt>
            <dd>You can enable client authorization of read and write operations by your applications. For example, you can define
                that only specific applications are allowed to read from a Kafka topic.  You can also restrict write access to Kafka
                topics to prevent data pollution or fraudulent activities.</dd>
        </dl>
        <p>For more information about the security features in Apache Kafka, see <a class="reference internal" href="../../documentation.html#security"><span class="std std-ref">Kafka Security</span></a>.</p>
        <div class="section" id="required-acl-setting-for-secure-kafka-clusters">
            <span id="streams-developer-guide-security-acls"></span><h2><a class="toc-backref" href="#id1">Required ACL setting for secure Kafka clusters</a><a class="headerlink" href="#required-acl-setting-for-secure-kafka-clusters" title="Permalink to this headline"></a></h2>
            <p>Kafka clusters can use ACLs to control access to resources (like the ability to create topics), and for such clusters each client,
                including Kafka Streams, is required to authenticate as a particular user in order to be authorized with appropriate access.
                In particular, when Streams applications are run against a secured Kafka cluster, the principal running the application must have
                the ACL set so that the application has the permissions to create, read and write
                <a class="reference internal" href="manage-topics.html#streams-developer-guide-topics-internal"><span class="std std-ref">internal topics</span></a>.</p>

            <p>Since all internal topics as well as the embedded consumer group name are prefixed with the <a class="reference internal" href="/{{version}}/documentation/streams/developer-guide/config-streams.html#required-configuration-parameters"><span class="std std-ref">application id</span></a>,
                it is recommended to use ACLs on prefixed resource pattern
                to configure control lists to allow client to manage all topics and consumer groups started with this prefix
                as <code class="docutils literal"><span class="pre">--resource-pattern-type prefixed --topic your.application.id --operation All </span></code>
                (see <a class="reference external" href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a>
                and <a class="reference external" href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a> for details).
            </p>
        </div>
        <div class="section" id="security-example">
            <span id="streams-developer-guide-security-example"></span><h2><a class="toc-backref" href="#id2">Security example</a><a class="headerlink" href="#security-example" title="Permalink to this headline"></a></h2>
            <p>The purpose is to configure a Kafka Streams application to enable client authentication and encrypt data-in-transit when
                communicating with its Kafka cluster.</p>
            <p>This example assumes that the Kafka brokers in the cluster already have their security setup and that the necessary SSL
                certificates are available to the application in the local filesystem locations. For example, if you are using Docker
                then you must also include these SSL certificates in the correct locations within the Docker image.</p>
            <p>The snippet below shows the settings to enable client authentication and SSL encryption for data-in-transit between your
                Kafka Streams application and the Kafka cluster it is reading and writing from:</p>
            <div class="highlight-bash"><div class="highlight"><pre><span></span><span class="c1"># Essential security settings to enable client authentication and SSL encryption</span>
bootstrap.servers<span class="o">=</span>kafka.example.com:9093
security.protocol<span class="o">=</span>SSL
ssl.truststore.location<span class="o">=</span>/etc/security/tls/kafka.client.truststore.jks
ssl.truststore.password<span class="o">=</span>test1234
ssl.keystore.location<span class="o">=</span>/etc/security/tls/kafka.client.keystore.jks
ssl.keystore.password<span class="o">=</span>test1234
ssl.key.password<span class="o">=</span>test1234
</pre></div>
            </div>
            <p>Configure these settings in the application for your <code class="docutils literal"><span class="pre">Properties</span></code> instance. These settings will encrypt any
                data-in-transit that is being read from or written to Kafka, and your application will authenticate itself against the
                Kafka brokers that it is communicating with. Note that this example does not cover client authorization.</p>
            <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Code of your Java application that uses the Kafka Streams library</span>
<span class="n">Properties</span> <span class="n">settings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">APPLICATION_ID_CONFIG</span><span class="o">,</span> <span class="s">&quot;secure-kafka-streams-app&quot;</span><span class="o">);</span>
<span class="c1">// Where to find secure Kafka brokers.  Here, it&#39;s on port 9093.</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">BOOTSTRAP_SERVERS_CONFIG</span><span class="o">,</span> <span class="s">&quot;kafka.example.com:9093&quot;</span><span class="o">);</span>
<span class="c1">//</span>
<span class="c1">// ...further non-security related settings may follow here...</span>
<span class="c1">//</span>
<span class="c1">// Security settings.</span>
<span class="c1">// 1. These settings must match the security settings of the secure Kafka cluster.</span>
<span class="c1">// 2. The SSL trust store and key store files must be locally accessible to the application.</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">CommonClientConfigs</span><span class="o">.</span><span class="na">SECURITY_PROTOCOL_CONFIG</span><span class="o">,</span> <span class="s">&quot;SSL&quot;</span><span class="o">);</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_TRUSTSTORE_LOCATION_CONFIG</span><span class="o">,</span> <span class="s">&quot;/etc/security/tls/kafka.client.truststore.jks&quot;</span><span class="o">);</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_TRUSTSTORE_PASSWORD_CONFIG</span><span class="o">,</span> <span class="s">&quot;test1234&quot;</span><span class="o">);</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_KEYSTORE_LOCATION_CONFIG</span><span class="o">,</span> <span class="s">&quot;/etc/security/tls/kafka.client.keystore.jks&quot;</span><span class="o">);</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_KEYSTORE_PASSWORD_CONFIG</span><span class="o">,</span> <span class="s">&quot;test1234&quot;</span><span class="o">);</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_KEY_PASSWORD_CONFIG</span><span class="o">,</span> <span class="s">&quot;test1234&quot;</span><span class="o">);</span>
</pre></div>
            </div>
            <p>If you incorrectly configure a security setting in your application, it will fail at runtime, typically right after you
                start it.  For example, if you enter an incorrect password for the <code class="docutils literal"><span class="pre">ssl.keystore.password</span></code> setting, an error message
                similar to this would be logged and then the application would terminate:</p>
            <div class="highlight-bash"><div class="highlight"><pre><span></span><span class="c1"># Misconfigured ssl.keystore.password</span>
Exception in thread <span class="s2">&quot;main&quot;</span> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
<span class="o">[</span>...snip...<span class="o">]</span>
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException:
   java.io.IOException: Keystore was tampered with, or password was incorrect
<span class="o">[</span>...snip...<span class="o">]</span>
Caused by: java.security.UnrecoverableKeyException: Password verification failed
</pre></div>
            </div>
            <p>Monitor your Kafka Streams application log files for such error messages to spot any misconfigured applications quickly.</p>
</div>
</div>


               </div>
              </div>
              <div class="pagination">
                <a href="/{{version}}/documentation/streams/developer-guide/manage-topics" class="pagination__btn pagination__btn__prev">Previous</a>
                <a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool" class="pagination__btn pagination__btn__next">Next</a>
              </div>
                </script>

                <!--#include virtual="../../../includes/_header.htm" -->
                <!--#include virtual="../../../includes/_top.htm" -->
                    <div class="content documentation documentation--current">
                    <!--#include virtual="../../../includes/_nav.htm" -->
                    <div class="right">
                    <!--#include virtual="../../../includes/_docs_banner.htm" -->
                    <ul class="breadcrumbs">
                    <li><a href="/documentation">Documentation</a></li>
                    <li><a href="/documentation/streams">Kafka Streams</a></li>
                    <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
                </ul>
                <div class="p-content"></div>
                    </div>
                    </div>
                    <!--#include virtual="../../../includes/_footer.htm" -->
                    <script>
                    $(function() {
                        // Show selected style on nav item
                        $('.b-nav__streams').addClass('selected');

                        //sticky secondary nav
                        var $navbar = $(".sub-nav-sticky"),
                            y_pos = $navbar.offset().top,
                            height = $navbar.height();

                        $(window).scroll(function() {
                            var scrollTop = $(window).scrollTop();

                            if (scrollTop > y_pos - height) {
                                $navbar.addClass("navbar-fixed")
                            } else if (scrollTop <= y_pos) {
                                $navbar.removeClass("navbar-fixed")
                            }
                        });

                        // Display docs subnav items
                        $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
                    });
              </script>
